package com.bw.gmall.realtime.dws.app;

import com.bw.gmall.realtime.common.base.BaseSqlApp;
import com.bw.gmall.realtime.common.constant.Constant;
import com.bw.gmall.realtime.common.util.SQLUtil;
import com.bw.gmall.realtime.dws.function.SplitFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/*
12.1流量域搜索关键词粒度页面浏览各窗口汇总表
 */

public class DwsTrafficSourceKeywordPageViewWindow extends BaseSqlApp {
    public static void main(String[] args) {
        new DwsTrafficSourceKeywordPageViewWindow().start(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW,4,10021);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String groupId) {
        // 1.获取日志表搜索的数据
        tableEnv.executeSql("create table page_log(\n" +
                " page map<String,String>,\n" +
                " common map<String,String>,\n" +
                " ts bigint,\n" +
                " et as to_timestamp_ltz(ts, 3), " +
                " watermark for et as et - interval '5' second " +
                ")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE,Constant.TOPIC_DWD_TRAFFIC_PAGE));
//        tableEnv.sqlQuery("select * from page_log").execute().print();

        // 2.获取搜索商品的数据
        Table table = tableEnv.sqlQuery("select \n" +
                "`page`['item'] item,\n" +
                "et\n" +
                "from page_log\n" +
                "where `page`['last_page_id'] = 'search'\n" +
                "and `page`['item_type'] = 'keyword'");

        tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
        tableEnv.createTemporaryView("word",table);
//        tableEnv.sqlQuery("select * from word").execute().print();

        // 3.炸开函数
        Table table1 = tableEnv.sqlQuery("select \n" +
                "item,\n" +
                "word,\n" +
                "et\n" +
                "from word,lateral table(SplitFunction(item))");

        tableEnv.createTemporaryView("words",table1);
//        tableEnv.sqlQuery("select * from words").execute().print();

        // 4.开窗聚合
        Table table2 = tableEnv.sqlQuery("SELECT\n" +
                "  date_format(TUMBLE_START(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') AS stt,\n" +
                "  date_format(TUMBLE_END(et, interval '5' second),'yyyy-MM-dd HH:mm:ss') AS edt,\n" +
                "  date_format(now() , 'yyyyMMdd') cur_date, " +
                "  word keyword,\n" +
                "  count(*) keyword_count\n" +
                "FROM words\n" +
                "GROUP BY\n" +
                "  TUMBLE(et, interval '5' second ),\n" +
                "  word");
//        table2.execute().print();

        // 5.写出doris
        tableEnv.executeSql("create table dws_traffic_source_keyword_page_view_window (\n" +
                "stt STRING,\n" +
                "edt STRING,\n" +
                "cur_date STRING,\n" +
                "keyword STRING,\n" +
                "keyword_count BIGINT\n" +
                ")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));

        // 6.写入kafka
        table2.insertInto(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW).execute();

    }
}
